package flink.serialize;

import org.apache.flink.connector.kafka.sink.TopicSelector;

import com.alibaba.fastjson.JSONObject;

import javax.annotation.Nullable;

/**
 * @author ：Jason
 * @date ：Created in 2023/5/5 11:59 AM
 * @description： 根据数据里面的某个字段动态的设置 topic
 * @modified By：
 * @version: 1.0
 */
public class KafkaTopicSelector implements TopicSelector<JSONObject> {
    private final String extractFiled;
    private final String topicPrefix;

    public KafkaTopicSelector(String extractFiled, @Nullable String topicPrefix) {
        this.extractFiled = extractFiled;
        this.topicPrefix = topicPrefix;
    }

    @Override
    public String apply(JSONObject element) {
        if (this.topicPrefix != null) {
            return String.format("%s_%s", this.topicPrefix, element.getString(this.extractFiled));
        } else {
            return element.getString(this.extractFiled);
        }
    }
}
